[[dlt-strategies]]
= DLT Strategies

The framework provides a few strategies for working with DLTs.
You can provide a method for DLT processing, use the default logging method, or have no DLT at all.
Also you can choose what happens if DLT processing fails.

[[dlt-processing-method]]
== DLT Processing Method

You can specify the method used to process the DLT for the topic, as well as the behavior if that processing fails.

To do that you can use the `@DltHandler` annotation in a method of the class with the `@RetryableTopic` annotation(s).
Note that the same method will be used for all the `@RetryableTopic` annotated methods within that class.

[source, java]
----
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
----

The DLT handler method can also be provided through the `RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)` method, passing as arguments the bean name and method name that should process the DLT's messages.

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}
----

NOTE: If no DLT handler is provided, the default `RetryTopicConfigurer.LoggingDltListenerHandlerMethod` is used.

Starting with version 2.8, if you don't want to consume from the DLT in this application at all, including by the default handler (or you wish to defer consumption), you can control whether or not the DLT container starts, independent of the container factory's `autoStartup` property.

When using the `@RetryableTopic` annotation, set the `autoStartDltHandler` property to `false`; when using the configuration builder, use `autoStartDltHandler(false)` .

You can later start the DLT handler via the `KafkaListenerEndpointRegistry`.

[[dlt-failure-behavior]]
== DLT Failure Behavior

Should the DLT processing fail, there are two possible behaviors available: `ALWAYS_RETRY_ON_ERROR` and `FAIL_ON_ERROR`.

In the former the record is forwarded back to the DLT topic so it doesn't block other DLT records' processing.
In the latter the consumer ends the execution without forwarding the message.

[source,java]
----

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
----

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
----

NOTE: The default behavior is to `ALWAYS_RETRY_ON_ERROR`.

IMPORTANT: Starting with version 2.8.3, `ALWAYS_RETRY_ON_ERROR` will NOT route a record back to the DLT if the record causes a fatal exception to be thrown,
such as a `DeserializationException`, because, generally, such exceptions will always be thrown.

Exceptions that are considered fatal are:

* `DeserializationException`
* `MessageConversionException`
* `ConversionException`
* `MethodArgumentResolutionException`
* `NoSuchMethodException`
* `ClassCastException`

You can add exceptions to and remove exceptions from this list using methods on the `DestinationTopicResolver` bean.

See xref:retrytopic/features.adoc#retry-topic-ex-classifier[Exception Classifier] for more information.


[[configuring-no-dlt]]
== Configuring No DLT

The framework also provides the possibility of not configuring a DLT for the topic.
In this case after retrials are exhausted the processing simply ends.

[source, java]
----

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
----

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}
----

